package com.gis.cdc;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.calcite.shaded.com.fasterxml.jackson.databind.module.SimpleSerializers;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        MySQLSource.builder()
                .hostname("192.168.231.121")
                .port(3306)
                .databaseList("lnnu")//库
                .tableList("lnnu.sensor_temp")//表
                .username("root")
                .password("00000000")
                .startupOptions(StartupOptions.initial())//启动的时候从第一次开始读取
                .deserializer(new SimpleStringSchema())
                .build();

    }
}
